package org.example.dobs.demo.flink.wc.transform;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import source.custom.MySourceFunction_V1;

/**
 * union的两个流的数据类型，必须一致
 */
public class UnionDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        see.setParallelism(1);

        DataStreamSource<Long> inputDataStream1 = see.addSource(new MySourceFunction_V1());
        DataStreamSource<Long> inputDataStream2 = see.addSource(new MySourceFunction_V1());
        //inputDataStream1和inputDataStream2类型必须一致
        DataStream<Long> unionDataStream = inputDataStream1.union(inputDataStream2);
        SingleOutputStreamOperator<Long> dataStream = unionDataStream.map(new MapFunction<Long, Long>() {
            @Override
            public Long map(Long aLong) throws Exception {
                System.out.println("处理数据：" + aLong);
                return aLong;
            }
        });
        SingleOutputStreamOperator<Long> sum = dataStream.timeWindowAll(Time.seconds(2)).sum(0);
        sum.print();

        see.execute(UnionDemo.class.getSimpleName());
    }

}
